#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>
#include <sys/mman.h>
#include <types.h>

#define DBG_SUBSYS S_LIBTASK

#include "adt.h"
#include "sysy_lib.h"
#include "lich_md.h"
#include "balance.h"
#include "dbg.h"

#define JOBS_SLICE 10000
#define BLANCE_MAX_DIFF 5

// raw_list element
typedef struct {
        struct list_head hook;
        chkid_t chkid;
        int replica;
        nid_t nid;
        char rack[HOST_NAME_MAX];
        char node[HOST_NAME_MAX];
} b_raw_t;

// node_list element
typedef struct {
        struct list_head hook;
        nid_t nid;
        int weight;
        char rack[HOST_NAME_MAX];
        char node[HOST_NAME_MAX];
        struct list_head raw_list;
} b_node_t;

// rack_list element
typedef struct {
        struct list_head hook;
        char rack[HOST_NAME_MAX];
} b_rack_t;

// job_list element
typedef struct {
        struct list_head hook;
        chkid_t chkid;
        uint16_t nochange;
        int repnum_old;
        int dists_count;
        nid_t dists[LICH_REPLICA_MAX];
} b_job_t;

typedef struct {
        // volume chkid
        chkid_t chkid;
        int repnum;
        
        // list and counter
        int rack_count;
        int node_count;
        int raw_count;
        int job_count;
        struct list_head rack_list;
        struct list_head node_list;
        struct list_head raw_list;
        struct list_head job_list;
        
        // policies
        int localize;
        char local_node[HOST_NAME_MAX];
        char site[MAX_NAME_LEN];
        char rack_of_vol[LICH_REPLICA_MAX][HOST_NAME_MAX];
} balance_t;


static int __make_sure_disk_forv3(char *disk)
{
        char newdisk[HOST_NAME_MAX];
        char split = '/';
        char *pos;

        pos = strchr(disk, split);
        if (pos) {
        } else {
                snprintf(newdisk, HOST_NAME_MAX, "%s/0", disk);
                strcpy(disk, newdisk);
        }

        return 0;
}

#if 0
static int __make_sure_disk_forv4(char *disk)
{
        char newdisk[HOST_NAME_MAX];
        char split = '/';
        char *pos;

        strcpy(newdisk, disk);

        pos = strchr(newdisk, split);
        if (pos) {
                *pos = '\0';
                strcpy(disk, newdisk);
        } else {
        }

        return 0;
}
#endif

int __free_balance(balance_t *balance)
{
        struct list_head *rack_pos, *node_pos, *raw_pos, *job_pos, *n, *m;
        b_node_t *node;

        list_for_each_safe(rack_pos, n, &balance->rack_list) {
                list_del(rack_pos);
                yfree((void **)&rack_pos);
        }

        list_for_each_safe(raw_pos, n, &balance->raw_list) {
                list_del(raw_pos);
                yfree((void **)&raw_pos);
        }

        list_for_each_safe(job_pos, n, &balance->job_list) {
                list_del(job_pos);
                yfree((void **)&job_pos);
        }

        list_for_each_safe(node_pos, n, &balance->node_list) {
                node = (void *)node_pos;
                list_for_each_safe(raw_pos, m, &node->raw_list) {
                        list_del(raw_pos);
                        yfree((void **)&raw_pos);
                }
                list_del(node_pos);
                yfree((void **)&node_pos);
        }

        return 0;
}

int __write_jobs(balance_t *balance, arg_t *arg)
{
        int ret, i;
        b_job_t *job;
        struct list_head *job_pos;
        res_t res;

        list_for_each(job_pos, &balance->job_list) {
                job = (void *)job_pos;
                if (job->nochange) {
                        continue;
                }

                res.chkid = job->chkid;
                res.repnum = job->dists_count;

                for(i = 0; i < job->dists_count; i++) {
                        res.disks[i] = job->dists[i];
                }

                ret = write(arg->fd, &res, sizeof(res));
                if (ret != sizeof(res)) {
                        ret = errno;
                        DWARN("write job fail, ret: %d\n", errno);
                        GOTO(err_ret, ret);
                }
                arg->balance++;
        }

        return 0;
err_ret:
        return 1;
}

int __gen_jobs(balance_t *balance)
{
        int ret, dists_count, diff;
        struct list_head *job_pos, *node_pos, *raw_pos, *raw_n_pos;
        b_job_t *job;
        b_node_t *node;
        b_raw_t *raw;

        list_for_each(job_pos, &balance->job_list) {
                job = (void *)job_pos;
                dists_count = 0;
                diff = 0;
                list_for_each(node_pos, &balance->node_list) {
                        node = (void *)node_pos;
                        list_for_each_safe(raw_pos, raw_n_pos, &node->raw_list) {
                                raw = (void *)raw_pos;
                                if (chkid_cmp(&raw->chkid, &job->chkid) == 0) {
                                        if (nid_cmp(&job->dists[raw->replica], &node->nid) != 0) {
                                                job->dists[raw->replica] = node->nid;
                                                diff++;
                                        }
                                        dists_count++;
                                        list_del(raw_pos);
                                        yfree((void **)&raw_pos);
                                        break;
                                }
                        }

                        if (dists_count == job->dists_count) {
                                break;
                        }
                }

                if (dists_count != job->dists_count) {
                        DWARN(""CHKID_FORMAT" can not found enough dist\n", CHKID_ARG(&job->chkid));
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }

                job->nochange = (diff==0?1:0);
                if (job->repnum_old != dists_count) {
                        job->nochange = 0;
                }

                /*DINFO(""CHKID_FORMAT" job->dists_count: %d, job->nochange: %d\n", CHKID_ARG(&job->chkid), job->dists_count, job->nochange);*/
        }

        return 0;
err_ret:
        return ret;
}

int __chkid_in_node(b_node_t *node, chkid_t *chkid)
{
        struct list_head *pos;
        b_raw_t *raw;

        list_for_each(pos, &node->raw_list) {
                raw = (void *)pos;
                if (chkid_cmp(&raw->chkid, chkid) == 0) {
                        return 1;
                }
        }

        return 0;
}

int __get_weight_node(balance_t *balance, b_node_t **node, int max, char *rack, chkid_t *skip_chkid)
{
        int weight = -1;
        struct list_head *pos;
        b_node_t *_node, *node_r;

        node_r = NULL;
        list_for_each(pos, &balance->node_list) {
                _node = (void *)pos;

                if ((balance->localize) && (strcmp(balance->local_node, _node->node) == 0)) {
                        continue;
                }

                if ((rack) && (balance->rack_count >= 3)) {
                        if (strcmp(_node->rack, rack) != 0) {
                                continue;
                        }
                }

                if ((skip_chkid) && __chkid_in_node(_node, skip_chkid)) {
                        continue;
                }

                if (weight < 0) {
                        weight = _node->weight;
                        node_r = _node;
                        continue;
                }

                if (max) {
                        if (_node->weight > weight) {
                                weight = _node->weight;
                                node_r = _node;
                        }
                } else {
                        if (_node->weight < weight) {
                                weight = _node->weight;
                                node_r = _node;
                        }
                }
        }

        *node = node_r;

        if (weight < 0) {
                return ENOENT;
        }

        return 0;
}

int __get_weight_min_node(balance_t *balance, b_node_t **node, char *rack, chkid_t *skip_chkid)
{
        int max = 0;
        return __get_weight_node(balance, node, max, rack, skip_chkid);
}

int __get_weight_max_node(balance_t *balance, b_node_t **node, char *rack, chkid_t *skip_chkid)
{
        int max = 1;
        return __get_weight_node(balance, node, max, rack, skip_chkid);
}

int __move_max2min(int replica, b_node_t *node_max, b_node_t *node_min)
{
        int ret, found = 0;
        b_raw_t *raw;
        struct list_head *raw_pos, *raw_n_pos;

        list_for_each_safe(raw_pos, raw_n_pos, &node_max->raw_list) {
                raw = (void *)raw_pos;
                if (raw->replica != replica) {
                        continue;
                }

                if (!__chkid_in_node(node_min, &raw->chkid)) {
                        list_move(raw_pos, &node_min->raw_list);
                        node_max->weight--;
                        node_min->weight++;
                        found++;
                        DINFO("mv "CHKID_FORMAT" replica: %d from %s(weight: %d) to %s(weight: %d)\n",
                                CHKID_ARG(&raw->chkid), raw->replica,
                                node_max->node, node_max->weight, node_min->node, node_min->weight);
                        break;
                }
        }

        if (found == 0) {
                ret = ENOSPC;
                DWARN("move_max2min error\n");
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int __balance_replica_step3(balance_t *balance, int replica)
{
        int ret;
        b_node_t *node_max;
        b_node_t *node_min;

        while (1) {
                ret = __get_weight_max_node(balance, &node_max, balance->rack_of_vol[replica], NULL);
                if (ret)
                        GOTO(err_ret, ret);

                ret = __get_weight_min_node(balance, &node_min, balance->rack_of_vol[replica], NULL);
                if (ret)
                        GOTO(err_ret, ret);

                if ((node_max->weight - node_min->weight) <= BLANCE_MAX_DIFF) {
                        DINFO("step3 ok\n");
                        break;
                }

                ret = __move_max2min(replica, node_max, node_min);
                if (ret)
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int __balance_replica_step2(balance_t *balance, int replica)
{
        int ret;
        b_raw_t *raw;
        b_node_t *node;
        struct list_head *raw_pos, *raw_n_pos;

        list_for_each_safe(raw_pos, raw_n_pos, &balance->raw_list) {
                raw = (void *)raw_pos;
                if (raw->replica != replica) {
                        continue;
                }

                ret = __get_weight_min_node(balance, &node, balance->rack_of_vol[replica], &raw->chkid);
                if (ret)
                        GOTO(err_ret, ret);

                list_move(raw_pos, &node->raw_list);
                node->weight++;
                DINFO("mv "CHKID_FORMAT" replica: %d to %s weight: %d\n",
                                CHKID_ARG(&raw->chkid), raw->replica, node->node, node->weight);
        }

        return 0;
err_ret:
        return ret;
}

int __balance_replica_step1_localize(balance_t *balance, int replica)
{
        int ret, found;
        b_raw_t *raw;
        b_node_t *node;
        struct list_head *raw_pos, *node_pos, *raw_n_pos;

        YASSERT(replica == 0);

        list_for_each_safe(raw_pos, raw_n_pos, &balance->raw_list) {
                found = 0;
                raw = (void *)raw_pos;
                if (raw->replica != replica) {
                        continue;
                }

                list_for_each(node_pos, &balance->node_list) {
                        node = (void *)node_pos;
                        if (strcmp(balance->local_node, node->node) == 0) {
                                found = 1;
                                list_move(raw_pos, &node->raw_list);
                                node->weight++;
                                DINFO("mv "CHKID_FORMAT" replica: %d to %s weight: %d\n",
                                        CHKID_ARG(&raw->chkid), raw->replica, node->node, node->weight);
                                break;
                        }
                }

                if (found == 0) {
                        DWARN("can't found %s for localize\n", balance->local_node);
                        ret = ENOSPC;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int __balance_replica_step1__(balance_t *balance, int replica)
{
        int ret;
        b_raw_t *raw;
        b_node_t *node;
        struct list_head *raw_pos, *node_pos, *raw_n_pos;

        list_for_each_safe(raw_pos, raw_n_pos, &balance->raw_list) {
                raw = (void *)raw_pos;
                if (raw->replica != replica) {
                        continue;
                }

                list_for_each(node_pos, &balance->node_list) {
                        node = (void *)node_pos;
                        if (strcmp(raw->node, node->node) == 0) {
                                if (__chkid_in_node(node, &raw->chkid)) {
                                        ret = __get_weight_min_node(balance, &node, balance->rack_of_vol[replica], &raw->chkid);
                                        if (ret)
                                                GOTO(err_ret, ret);
                                }

                                list_move(raw_pos, &node->raw_list);
                                node->weight++;
                                DINFO("mv "CHKID_FORMAT" replica: %d to %s weight: %d\n",
                                        CHKID_ARG(&raw->chkid), raw->replica, node->node, node->weight);
                                break;
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

int __balance_replica(balance_t *balance, int replica)
{
        int ret;
        struct list_head *raw_pos, *raw_n_pos;
        b_raw_t *raw;

        if (balance->localize && (replica == 0)) {
                return __balance_replica_step1_localize(balance, replica);
        } else {
                ret = __balance_replica_step1__(balance, replica);
                if (ret)
                        GOTO(err_ret, ret);
        }

        ret = __balance_replica_step2(balance, replica);
        if (ret)
                GOTO(err_ret, ret);

        list_for_each_safe(raw_pos, raw_n_pos, &balance->raw_list) {
                raw = (void *)raw_pos;
                DINFO("left:  "CHKID_FORMAT" replica: %d node: %s\n",
                                CHKID_ARG(&raw->chkid), raw->replica, raw->node);
        }

        if (balance->repnum == replica + 1) {
                YASSERT(list_empty(&balance->raw_list));
        }

        ret = __balance_replica_step3(balance, replica);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int __check_balance(balance_t *balance)
{
        int i, ret, found;
        struct list_head *rack_pos;
        b_rack_t *rack;

        // check node
        if (balance->repnum > balance->node_count) {
                ret = ENOSPC;
                DWARN("balance->repnum(%d) > balance->node_count(%d)\n",
                                balance->repnum, balance->node_count);
                GOTO(err_ret, ret);
        }

        // check rack
        found = 0;
        for (i = 0; i < balance->repnum; i++) {
                list_for_each(rack_pos, &balance->rack_list) {
                        rack = (void *)rack_pos;
                        if (strcmp(balance->rack_of_vol[i], rack->rack) == 0) {
                                found++;
                        }
                }
        }
        if (found != balance->repnum) {
                ret = ENOSPC;
                DWARN("no enough rack\n");
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/** init raw_list and job_list
 *
 * @param balance
 * @param id
 * @param begin
 * @param end
 * @return
 */
int __init_balance_raw_list(balance_t *balance, const char *pool, const chkid_t *id, uint64_t begin, uint64_t end)
{
        int ret, replica;
        uint64_t i, need_new;
        chkid_t newid;
        b_job_t *job;
        b_raw_t *raw;
        b_node_t *node;
        char name[HOST_NAME_MAX];

        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;
        chkinfo = (void *)_chkinfo;

        // raw chunk list
        for (i = begin; i < end; i++) {
                fid2cid(&newid, id, i);
        retry1:
                ret = md_chunk_getinfo(pool, NULL, &newid, chkinfo, NULL);
                if (ret) {
                        if (ret == EAGAIN || ret == ENONET || ret == ENOSYS || ret == EBUSY) {
                                goto retry1;
                        } else {
                                if (ret != ENOENT) {
                                        DWARN(""CHKID_FORMAT" getinfo fail, ret: %d\n", CHKID_ARG(&newid), ret);
                                }
                                continue;
                        }
                }

                ret = ymalloc((void **)&job, sizeof(*job));
                if (ret)
                        GOTO(err_ret, ret);
        
                job->nochange = 0;
                job->chkid = newid;
                job->repnum_old = chkinfo->repnum;
                job->dists_count = balance->repnum;

                list_add_tail(&job->hook, &balance->job_list);
                balance->job_count++;

                for (replica = 0; replica < balance->repnum; replica++) {
                        // new raw
                        ret = ymalloc((void **)&raw, sizeof(*raw));
                        if (ret)
                                GOTO(err_ret, ret);

                        need_new = 0;
                        if (replica < chkinfo->repnum) {
                                strcpy(name, network_rname(&(chkinfo->diskid[replica].id)));
                                __make_sure_disk_forv3(name);
                                disk2rack(name, raw->rack);

                                // TODO
                                if (strcmp(balance->rack_of_vol[replica], raw->rack) != 0) {
                                        need_new = 1;
                                }
                        } else {
                                need_new = 1;
                        }

                        if (need_new) {
                                ret = __get_weight_min_node(balance, &node, balance->rack_of_vol[replica], &chkinfo->id);
                                if (ret)
                                        GOTO(err_ret, ret);

                                strcpy(name, node->node);
                                DINFO(""CHKID_FORMAT" add replica %d %s\n", CHKID_ARG(&newid), replica, name);
                        }

                        __make_sure_disk_forv3(name);

                        raw->chkid = newid;
                        raw->replica = replica;
                        raw->nid = chkinfo->diskid[replica].id;
                        disk2rack(name, raw->rack);
                        disk2node(name, raw->node);
                        
                        list_add_tail(&raw->hook, &balance->raw_list);
                        balance->raw_count++;

                        job->dists[replica] = chkinfo->diskid[replica].id;;
                }
        }

        return 0;
err_ret:
        return ret;
}

/** get unique rack_list from node_list
 *
 * @param balance
 * @return
 */
int __init_balance_rack_list(balance_t *balance)
{
        int ret, found;
        b_node_t *node;
        b_rack_t *rack;
        struct list_head *node_pos, *rack_pos;

        list_for_each(node_pos, &balance->node_list) {
                node = (void *)node_pos;

                found = 0;
                list_for_each(rack_pos, &balance->rack_list) {
                        rack = (void *)rack_pos;
                        if (strcmp(node->rack, rack->rack) == 0) {
                                found = 1;
                        }
                }

                if (!found) {
                        // new rack
                        ret = ymalloc((void **)&rack, sizeof(*rack));
                        if (ret)
                                GOTO(err_ret, ret);

                        strcpy(rack->rack, node->rack);
                        
                        list_add_tail(&rack->hook, &balance->rack_list);
                        balance->rack_count++;
                        DINFO("add rack, %s to rack: %s, count: %d\n", node->rack, rack->rack, balance->rack_count);
                }
        }

        return 0;
err_ret:
        return ret;
}

static inline int hostname(char *dest, const char *src) {
        strcpy(dest, src);
        __make_sure_disk_forv3(dest);
        return 0;
}

/** get nodelist, the same site as volume
 *
 * @param balance
 * @return
 */
int __init_balance_node_list(balance_t *balance)
{
        int ret, buflen, done = 0;
        char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN], name[HOST_NAME_MAX], site[MAX_NAME_LEN];
        uint64_t offset = 0, offset2 = 0;
        struct dirent *de;
        nodeinfo_t info;
        b_node_t *node;
        uuid_t _uuid;
        char uuid[MAX_NAME_LEN] = {};

        uuid_generate(_uuid);
        uuid_unparse(_uuid, uuid);

        ret = cluster_listnode_open(uuid);
        if (ret)
                GOTO(err_ret, ret);

        while (done == 0) {
                memset(buf, 0, sizeof(buf));
                ret = cluster_listnode(buf, &buflen, uuid, offset);
                if (ret)
                        GOTO(err_close, ret);

                if (buflen == 0)
                        break;
 
                offset2 = 0;
                dir_for_each(buf, buflen, de, offset2) {
                        DINFO("node %s, balance->site: %s\n", de->d_name, balance->site);
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                break;
                        } else if (buflen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                break;
                        
                        offset += de->d_reclen;

                        ret = node_getinfo(&info, de->d_name, tmp);
                        if (ret) {
                                continue;
                        }
                        
                        if (!node_stat_writeable(info.stat->status)) {
                                continue;
                        }

                        // check site
                        strcpy(name, de->d_name);
                        __make_sure_disk_forv3(name);
                        disk2site(name, site);

                        if (!cluster_storage_area_is_null(balance->site)) {
                                if (strcmp(site, balance->site) != 0) {
                                        continue;
                                }
                        }
                
                        // new node
                        ret = ymalloc((void **)&node, sizeof(*node));
                        if (ret)
                                GOTO(err_close, ret);
                
                        node->nid = info.stat->nid;
                        node->weight = 0;
                        disk2rack(name, node->rack);
                        disk2node(name, node->node);
                        INIT_LIST_HEAD(&node->raw_list);
                        
                        list_add_tail(&node->hook, &balance->node_list);
                        balance->node_count++;
                        DINFO("add node, %s, %s, count: %d\n", node->rack, node->node, balance->node_count);
                }
        }

        cluster_listnode_close(uuid);
        return 0;
err_close:
        cluster_listnode_close(uuid);
err_ret:
        return ret;
}


static void __dump_balance(const balance_t *balance)
{
        DINFO("balance_t(chkid: "CHKID_FORMAT", site: %s, local_node: %s, localize: %d, repnum: %d)\n",
              CHKID_ARG(&balance->chkid),
              balance->site,
              balance->local_node,
              balance->localize,
              balance->repnum);
}


int __init_balance(balance_t *balance, const char *pool, const chkid_t *id, uint64_t begin, uint64_t end)
{
        int i, ret, retry;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX], name[HOST_NAME_MAX];
        chkinfo = (void *)_chkinfo;
        fileinfo_t fileinfo;

        balance->chkid = *id;
        balance->rack_count = 0;
        balance->node_count = 0;
        balance->job_count = 0;
        balance->raw_count = 0;
        INIT_LIST_HEAD(&balance->rack_list);
        INIT_LIST_HEAD(&balance->node_list);
        INIT_LIST_HEAD(&balance->job_list);
        INIT_LIST_HEAD(&balance->raw_list);

        retry = 0;
retry:
        ret = md_getattr(id, &fileinfo);
        if (ret) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = md_chunk_getinfo(pool, NULL, id, chkinfo, NULL);
        if (ret)
                GOTO(err_ret, ret);

        // target repnum
        if (id->type == __RAW_CHUNK__) {
                balance->repnum = fileinfo.repnum_usr;
        } else {
                if (fileinfo.repnum_sys == 0)
                        balance->repnum = gloconf.metadata_replica;
                else
                        balance->repnum = fileinfo.repnum_sys;
        }

        YASSERT(balance->repnum);
        
        if (chkinfo->repnum < balance->repnum) {
                ret = EPERM;
                DWARN(""CHKID_FORMAT" need recovery replica\n", CHKID_ARG(id));
                // GOTO(err_ret, ret);
        }
        
        for (i = 0; i < chkinfo->repnum; i++) {
                strcpy(name, network_rname(&(chkinfo->diskid[i].id)));
                __make_sure_disk_forv3(name);
                disk2rack(name, balance->rack_of_vol[i]);
                DINFO("name: %s, rack_of_vol: %s\n", name, balance->rack_of_vol[i]);
                if (i == 0) {
                        disk2node(name, balance->local_node);
                }
        }

        balance->localize = is_volume_localize(&fileinfo);
        
        // first replica: name to node
        // strcpy(name, network_rname(&(chkinfo->diskid[0].id)));
        // __make_sure_disk_forv3(name);
        // disk2node(name, balance->local_node);

        ret = md_chunk_getsite(pool, id, balance->site);
        if (ret)
                GOTO(err_ret, ret);

        ret = __init_balance_node_list(balance);
        if (ret)
                GOTO(err_ret, ret);

        ret = __init_balance_rack_list(balance);
        if (ret)
                GOTO(err_ret, ret);

        __dump_balance(balance);
        ret = EPERM;
        GOTO(err_ret, ret);

        ret = __init_balance_raw_list(balance, pool, id, begin, end);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/** handle volume chunk range
 *
 * @param chkid volume chkid
 * @param arg
 * @param begin begin raw chunk idx
 * @param end end raw chunk idx
 * @return
 */
static int __balance_slice(arg_t *arg, const char *pool, const chkid_t *chkid, uint64_t begin, uint64_t end)
{
        int ret, replica;
        balance_t balance;

        DINFO(""CHKID_FORMAT": [%llu, %llu)\n", CHKID_ARG(chkid), (LLU)begin, (LLU)end);

        ret = __init_balance(&balance, pool, chkid, begin, end);
        if (ret)
                GOTO(err_free, ret);

        ret = __check_balance(&balance);
        if (ret)
                GOTO(err_free, ret);

        for (replica = 0; replica < balance.repnum; replica++) {
                ret = __balance_replica(&balance, replica);
                if (ret)
                        GOTO(err_free, ret);
        }

        ret = __gen_jobs(&balance);
        if (ret)
                GOTO(err_free, ret);

        ret = __write_jobs(&balance, arg);
        if (ret)
                GOTO(err_free, ret);

        __free_balance(&balance);

        return 0;
err_free:
        __free_balance(&balance);
        return 0;
}

/**
 *
 * @param chkid volume chkid
 * @param arg
 * @return
 */
static int balance_volume(const char *pool, const chkid_t *chkid, arg_t *arg)
{
        int ret;
        uint64_t chknum, begin, step;
        fileinfo_t fileinfo;

        ret = md_getattr(chkid, &fileinfo);
        if (ret)
                GOTO(err_ret, ret);

        // TODO xrange
        begin = 0;
        chknum = size2chknum(fileinfo.size, &fileinfo.ec);
        while (begin < chknum) {
                step = chknum - begin < JOBS_SLICE ? chknum - begin : JOBS_SLICE;
                ret = __balance_slice(arg, pool, chkid, begin, begin + step);
                if (ret)
                        GOTO(err_ret, ret);

                begin += step;
        }

        return 0;
err_ret:
        return ret;
}

int gen_jobs(const char *pool, const chkid_t *chkid, arg_t *arg)
{
        int ret;

        ret = balance_volume(pool, chkid, arg);
        if (ret)
                GOTO(err_ret, ret);

        DINFO("gen jobs ok\n");

        return 0;
err_ret:
        return ret;
}
